Skip to content

fix: close all memory stream ends in client transport cleanup#2266

Merged
maxisbey merged 9 commits intomainfrom
max-153-stream-leak-fix
Mar 13, 2026
Merged

fix: close all memory stream ends in client transport cleanup#2266
maxisbey merged 9 commits intomainfrom
max-153-stream-leak-fix

Conversation

@maxisbey
Copy link
Contributor

Summary

Client transports for SSE, WebSocket, and StreamableHTTP create 4 anyio memory stream ends (2 paired streams) but only closed 2 in their finally blocks. anyio memory stream ends are independent — closing the writer does not close the reader. Unclosed stream ends leak and emit ResourceWarning when garbage collected.

This caused flaky CI failures: a transport connection error (404, 403, ConnectError) in one test would leak streams, then GC in a later unrelated test would trigger ResourceWarning, which pytest's filterwarnings = ["error"] promotes to a test failure — in whatever test happened to be running when GC fired, not the test that actually leaked.

Fix

Follows the existing correct pattern in stdio.py (which closes all 4 ends on both early-fail and normal-exit paths):

File Before After
sse.py finally closed 2 of 4 finally closes all 4
streamable_http.py finally closed 2 of 4 — read_stream was never closed, even on happy path finally closes all 4
websocket.py No try/finally at all — if ws_connect() raised, all 4 leaked Wrapped entire body in try/finally that closes all 4

anyio's aclose() is idempotent, so double-closing (e.g. when reader/writer tasks already closed their end) is safe.

Tests

Added tests/client/test_transport_stream_cleanup.py with one regression test per transport. Each test triggers the error/exit path, then calls gc.collect() to force any leaked stream to emit ResourceWarning deterministically. All 3 tests fail on main with ResourceWarning: Unclosed <MemoryObjectReceiveStream> and pass with this fix.

CI Evidence of the Flakiness

AI Disclaimer

Client transports for SSE, WebSocket, and StreamableHTTP create 4 memory
stream ends (2 paired streams) but only closed 2 in their finally blocks.
anyio memory stream ends are independent — closing the writer does not
close the reader. The unclosed ends leak and emit ResourceWarning when
garbage collected.

This caused flaky test failures in CI: a transport connection error in
one test would leak streams, then GC in a later unrelated test would
trigger ResourceWarning, which pytest promotes to a test failure.

Fix follows the existing correct pattern in stdio.py:
- sse.py: close all 4 stream ends in the existing finally block
- streamable_http.py: close all 4 stream ends in the existing finally
  block (read_stream was previously never closed, even on happy path)
- websocket.py: add try/finally wrapping the entire body, closing all
  4 stream ends (previously had no cleanup at all — ws_connect failure
  leaked everything)

Regression tests force gc.collect() after the transport context exits
so leaked streams fail deterministically in the test that caused them.
The gc.collect() in these tests was picking up leaked PipeHandles from
flaky stdio tests (TestChildProcessCleanup) on the same xdist worker,
causing false failures on Windows CI.

Now uses a custom sys.unraisablehook that filters for MemoryObject
stream leaks specifically, ignoring unrelated resources leaked by other
tests. Also adds explicit del exc_info in the SSE test since the
traceback would otherwise keep leaked stream locals alive past
gc.collect().
The filterwarnings on test_sse_client_handles_empty_keepalive_pings was
added in PR #1753 with a comment claiming the leak was a test artifact
from mocking aconnect_sse. It wasn't — it was the real sse_client bug
where read_stream and write_stream_reader were never closed in the
finally block. With that bug now fixed, the filter is no longer needed.
@maxisbey maxisbey marked this pull request as ready for review March 12, 2026 18:22
Replace explicit try/finally + aclose() chains with async with on all
4 stream ends. Memory stream context managers are idempotent and have
no checkpoints in __aexit__, so this is semantically identical to the
try/finally form with the same teardown ordering — but the ownership
is stated once at creation time and can't drift.

For websocket_client, also move stream creation inside ws_connect so
a connection failure never creates streams in the first place.

Matches the existing pattern in shared/memory.py.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — straightforward resource leak fix with solid regression tests.

Extended reasoning...

Overview

This PR fixes a resource leak in three client transports (SSE, StreamableHTTP, WebSocket) where only 2 of 4 anyio memory stream ends were being closed in finally blocks. The fix adds the missing aclose() calls (2 lines each for sse.py and streamable_http.py) and wraps websocket.py in a try/finally (which previously had none). A new test file adds 3 regression tests that force GC and verify no ResourceWarning is emitted, and an existing filterwarnings decorator that was masking the bug is removed from test_sse.py.

Security risks

None. This is pure resource cleanup — adding aclose() calls to stream objects that are already being discarded. No data handling, no auth, no network surface changes.

Level of scrutiny

Low. The changes are mechanical and follow the existing correct pattern in stdio.py (which already closes all 4 stream ends at lines 131-134 and 208-211). The PR description correctly notes that anyio aclose() is idempotent — in fact, sse.py already relied on this before the PR (the sse_reader finally block and the outer finally block both close read_stream_writer), so this is not introducing new idempotence assumptions. The websocket.py change is just indentation plus a try/finally wrapper; the inner logic is unchanged.

Other factors

  • The regression tests are well-designed: they use sys.unraisablehook to capture only MemoryObject stream warnings (explicitly ignoring unrelated leaks from other tests on the same xdist worker), trigger the error/exit paths, and force gc.collect() to make leaks deterministic.
  • The PR description includes CI evidence of the flakiness this fixes, with links to runs where unrelated tests failed due to GC-timing of leaked streams.
  • Removing the filterwarnings decorator from test_sse_client_handles_empty_keepalive_pings demonstrates the fix actually resolves the root cause rather than papering over it.
  • No outstanding reviewer comments, no CODEOWNERS file, no design decisions involved.

…rage

Python 3.11's bytecode for nested async with blocks produces extra
branch arcs that coverage.py tracks but the test suite doesn't
exercise. Merging the stream context-managers with the task group
into a single async with restores the same nesting depth as main,
so branch count stays at 4 instead of 6.

Teardown order is also slightly better this way: tg.__aexit__ runs
first (waits for cancelled tasks), then stream ends close in reverse
order — tasks are fully done before streams close.
The async with form triggers coverage.py false-positive branch arcs
on Python 3.14 — nested multi-CM async with creates WITH_EXCEPT_START
handlers whose POP_JUMP_IF_TRUE branches (did __aexit__ suppress?)
get attributed to inner async with lines via the exception table.
Memory stream __aexit__ never suppresses, so those arcs are
unreachable, but coverage.py's static analysis expects them.

try/finally has no suppression branch (it's unconditional RERAISE),
so it sidesteps the issue entirely. Keeping the explicit aclose()
chain for these two files; websocket.py stays on the merged
async with form since its single-level nesting is clean on all
tested Python versions (3.10-3.14).
The anyio pytest plugin's free_tcp_port fixture tracks allocated
ports within a session so concurrent tests don't step on each other,
which the hand-rolled bind-and-close approach doesn't guarantee.
Comment on lines +163 to +164
await read_stream.aclose()
await write_stream_reader.aclose()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟣 Not blocking — follow-up note. The same pattern this PR fixes exists in the server-side transports: server/stdio.py, server/websocket.py, and server/sse.py each create 4 stream ends but only close 2 (via inner async with in reader/writer tasks), with no outer try/finally — read_stream and write_stream are never closed by the transport itself. Ironically, server/stdio.py doesn't follow the client/stdio.py pattern this PR cites as the correct reference. This likely doesn't cause the current CI flakiness (server tests run in subprocesses, and tests/server/test_stdio.py manually closes both streams via async with), but it's worth a follow-up issue to apply the same fix on the server side for consistency.

Extended reasoning...

Summary

The server-side transport files have the exact same memory-stream leak pattern that this PR fixes on the client side. This is a pre-existing issue, not introduced by this PR, and does not need to block the PR — but it's directly relevant context that a reviewer should know about when approving a fix scoped to half of the affected code.

Affected files and the pattern

I verified each server transport against the code:

src/mcp/server/stdio.py (lines 52–83): Creates 4 stream ends via two create_memory_object_stream calls. The stdin_reader task uses async with read_stream_writer: and stdout_writer uses async with write_stream_reader:, so 2 ends are closed when the tasks complete. But read_stream and write_stream are yielded to the caller and never closed by the transport. There is no outer try/finally.

src/mcp/server/websocket.py (lines 28–58): Identical structure. ws_reader closes read_stream_writer, ws_writer closes write_stream_reader. read_stream and write_stream are never closed. No outer try/finally.

src/mcp/server/sse.py: Creates 6 stream ends (4 + an SSE pair). A subset is closed by inner task bodies; read_stream, write_stream, and sse_stream_reader are never explicitly closed.

Comparison with the "correct" pattern

The PR description cites client/stdio.py as the reference correct pattern. Lines 208–211 of client/stdio.py close all 4 ends in the finally block:

await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()

Ironically, server/stdio.py — the file most directly parallel to client/stdio.py — does not follow this pattern.

Why this doesn't cause current CI flakiness (addressing the counter-argument)

One might expect server-side leaks to cause the same knock-on ResourceWarning failures the PR describes. I investigated and this is largely not the case in the current test suite, for two reasons:

  1. Subprocess isolation: tests/shared/test_ws.py and tests/shared/test_sse.py run the server via multiprocessing.Process (see the server fixture in tests/shared/test_sse.py around line 119). Leaked memory streams live in the subprocess's memory and die with the subprocess when proc.kill() is called — they never trigger a ResourceWarning in the pytest process.

  2. Manual close in test code: tests/server/test_stdio.py runs stdio_server in-process, but the test explicitly wraps the yielded streams: async with read_stream: (line 30) and async with write_stream: (line 49). So the test compensates for the transport's missing cleanup.

So the CI evidence cited in the PR description (client-side test_basic_resourcestest_tool_progress cascade) is genuinely a client-side problem, and this PR fully addresses the observed flakiness.

Why it's still worth a follow-up

  • Defense in depth: If someone adds an in-process server test that doesn't manually async with the yielded streams, the leak surfaces.
  • API contract: Relying on callers to close streams that the transport created is brittle. The transport owns the resource; it should own the cleanup.
  • Consistency: After this PR, 4 client transports close all stream ends; 3 server transports don't. That asymmetry is a maintenance hazard.

Recommended fix (for the follow-up, not this PR)

Same pattern as this PR — add the missing aclose() calls in a finally block, or use the compound async with approach from client/websocket.py. Since anyio's aclose() is idempotent, double-close is safe when the caller has already closed its end.

The free_tcp_port pytest fixture was added in anyio 4.9.0. The
lowest-direct CI matrix was resolving to 4.5.0, which doesn't
have it. Locked version was already 4.10.0 so no lockfile churn
beyond the constraint metadata.
When unraisablehook fires from __del__, args.object is the __del__
function itself, not the stream instance. The previous check happened
to match because Python's function repr includes the qualname, but the
failure message showed '<function MemoryObjectSendStream.__del__ ...>'
instead of the actual 'Unclosed <MemoryObjectSendStream at 0x...>'
warning. Checking exc_value is semantically correct and gives
actionable output when a regression reintroduces the leak.
@maxisbey maxisbey requested a review from Kludex March 13, 2026 13:28
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — all prior feedback addressed (exc_value fix, anyio>=4.9 bump).

Extended reasoning...

Overview

This PR fixes a real resource leak in 3 client transports (SSE, WebSocket, StreamableHTTP). Each creates 4 anyio memory stream ends but only closed 2 in their cleanup paths. Since anyio stream ends are independent (closing the writer does not close the reader), the other 2 leaked, emitting ResourceWarning on GC and causing nondeterministic CI failures in unrelated tests.

The fix is mechanical: add 2 more aclose() calls to the finally blocks in sse.py and streamable_http.py, and restructure websocket.py to create streams inside the ws_connect context and wrap all 4 in the compound async with alongside the task group. This directly mirrors the existing correct pattern in client/stdio.py (lines 131-134 and 208-211), and aclose() is idempotent so double-close (when reader/writer tasks already closed their end) is safe. The anyio version floor is bumped from 4.5→4.9 to support the free_tcp_port fixture used in the new tests.

Security risks

None. The changes are purely resource-cleanup: adding aclose() calls and reordering stream creation. No auth, crypto, permission, or input-handling logic is touched. The anyio bump is a benign compatibility floor change.

Level of scrutiny

Low-to-medium. The core fix is a 2-line addition per transport following an existing in-repo pattern, with clear regression tests that demonstrably fail on main and pass here. The websocket.py restructure is slightly larger but semantically equivalent and cleaner than a try/finally — moving stream creation inside ws_connect means no streams exist if the connection fails, and the compound async with guarantees all 4 are closed on both normal exit and exception.

Other factors

All three of my prior review comments have been addressed:

  • The args.object vs args.exc_value nit in the test hook → fixed in commit d532923 (now checks str(args.exc_value))
  • The free_tcp_port fixture requiring anyio>=4.9 → fixed in commit 3332cfb (runtime floor bumped; maxisbey explicitly weighed the tradeoff and agronholm reviewed without objection)
  • The server-side follow-up note was explicitly marked non-blocking and remains out of scope

The tests/shared/test_sse.py change (removing the filterwarnings ignore) is a nice validation that the fix actually resolves the root cause rather than papering over it. agronholm (anyio maintainer) and Kludex both engaged with the review; no outstanding unresolved concerns.

@maxisbey maxisbey merged commit e1fd62e into main Mar 13, 2026
38 checks passed
@maxisbey maxisbey deleted the max-153-stream-leak-fix branch March 13, 2026 14:43
maxisbey added a commit that referenced this pull request Mar 13, 2026
Apply the websocket_client pattern from #2266 to the other two
transports: establish the network connection first, create memory
streams only after it succeeds, then own all four stream ends plus
the task group in a single merged async with as the innermost scope.

This eliminates the try/finally + four explicit aclose() calls.
If the connection fails, no streams were ever created — nothing to
clean up. The multi-CM async with unwinds in reverse order on exit,
so tg.__aexit__ waits for cancelled tasks to finish before any
stream end closes.

streamable_http has one outer async with (the AsyncExitStack for the
conditional httpx client), which is clean on all Python versions.

sse has two unavoidable outer layers (httpx_client_factory feeds into
aconnect_sse — data dependency, can't merge). On 3.14, coverage.py's
static analysis sees a phantom branch on the innermost multi-CM line:
each __aexit__ gets a POP_JUMP_IF_TRUE for 'did it suppress the
exception?', which memory streams never do. One targeted pragma on
the line we own, documented inline.

Behavior change: sse_client's ConnectError is no longer wrapped in
an ExceptionGroup, since the task group is never entered when the
connection fails. Updated the regression test to match.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants